package com.xiaomaoguai.fcp.pre.kepler.glue.repository.spi;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xiaomaoguai.fcp.pre.kepler.glue.autoconfiguration.GlueBootstrap;
import com.xiaomaoguai.fcp.pre.kepler.glue.model.GlueClass;
import com.xiaomaoguai.fcp.pre.kepler.glue.annotation.GlueSPI;
import com.xiaomaoguai.fcp.pre.kepler.glue.config.GlueConfig;
import com.xiaomaoguai.fcp.pre.kepler.glue.config.GlueZookeeperConfig;
import com.xiaomaoguai.fcp.pre.kepler.glue.repository.GlueRepository;
import com.xiaomaoguai.fcp.pre.kepler.glue.utils.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author WeiHui-Z
 * @version v1.0.0
 * @date 2019/9/23 19:54
 * @since JDK 1.8
 */
@Slf4j
@GlueSPI("zookeeper")
public class ZookeeperGlueRepository implements GlueRepository {

	/**
	 * 线程工厂
	 */
	private ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PathNodeListener-%s").build();

	private GlueBootstrap glueBootstrap;

	/**
	 * init. 初始化，比如连接zk
	 *
	 * @param glueConfig {@linkplain com.xiaomaoguai.fcp.pre.kepler.glue.config.GlueConfig}
	 */
	@Override
	public void init(GlueConfig glueConfig) throws Exception {
		this.glueBootstrap = SpringContextHolder.getBean(GlueBootstrap.class);

		GlueZookeeperConfig glueZookeeperConfig = glueConfig.getGlueZookeeperConfig();
		String listenPath = glueZookeeperConfig.getListenPath();
		CuratorFramework curatorFramework = curatorFramework(glueZookeeperConfig);

		if (curatorFramework != null && StringUtils.isNotBlank(listenPath)) {
			//配置节点的监听类
			TreeCache treeCache = new TreeCache(curatorFramework, listenPath);
			treeCache.getListenable().addListener((client, treeCacheEvent) -> {
				ChildData childData = treeCacheEvent.getData();
				TreeCacheEvent.Type type = treeCacheEvent.getType();
				//过滤数据为空的事件
				if (childData == null || childData.getData() == null) {
					log.debug("event received, type: {} , data was  null , discard this", type);
					return;
				}
				//计算path
				String path = childData.getPath();
				//计算数据
				String data = new String(childData.getData(), StandardCharsets.UTF_8);
				if (StringUtils.isBlank(data)) {
					log.debug("event received, type: {} , data was  blank , discard this", type);
					return;
				}
				String glueName = StringUtils.substringAfterLast(path, "/");
				GlueClass glueClass = new GlueClass(glueName, data);
				switch (type) {
					case NODE_ADDED:
						ZookeeperGlueRepository.this.glueBootstrap.addOrUpdateGlueClass(glueName, glueClass);
						break;
					case NODE_UPDATED:
						ZookeeperGlueRepository.this.glueBootstrap.addOrUpdateGlueClass(glueName, glueClass);
						break;
					case NODE_REMOVED:
						ZookeeperGlueRepository.this.glueBootstrap.removeGlueClass(glueName);
						break;
					default:
						log.error("unknown message type: " + type);
				}
			}, asyncExecutor());
			treeCache.start();
		}
	}

	/**
	 * 初始化zk连接
	 *
	 * @param glueZookeeperConfig zk配置
	 * @return CuratorFramework
	 */
	private CuratorFramework curatorFramework(GlueZookeeperConfig glueZookeeperConfig) {
		log.info("zookeeper registry center init, server lists is: {}.", glueZookeeperConfig.getServerLists());
		CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
				.threadFactory(threadFactory)
				.connectString(glueZookeeperConfig.getServerLists())
				.retryPolicy(new ExponentialBackoffRetry(
						glueZookeeperConfig.getBaseSleepTimeMilliseconds(),
						glueZookeeperConfig.getMaxRetries(),
						glueZookeeperConfig.getMaxSleepTimeMilliseconds()))
				.namespace(glueZookeeperConfig.getNamespace());
		if (0 != glueZookeeperConfig.getSessionTimeoutMilliseconds()) {
			builder.sessionTimeoutMs(glueZookeeperConfig.getSessionTimeoutMilliseconds());
		}
		if (0 != glueZookeeperConfig.getConnectionTimeoutMilliseconds()) {
			builder.connectionTimeoutMs(glueZookeeperConfig.getConnectionTimeoutMilliseconds());
		}
		if (StringUtils.isNotBlank(glueZookeeperConfig.getDigest())) {
			builder.authorization("digest", glueZookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8))
					.aclProvider(new ACLProvider() {

						@Override
						public List<ACL> getDefaultAcl() {
							return ZooDefs.Ids.CREATOR_ALL_ACL;
						}

						@Override
						public List<ACL> getAclForPath(final String path) {
							return ZooDefs.Ids.CREATOR_ALL_ACL;
						}
					});
		}
		CuratorFramework curatorFramework = builder.build();
		curatorFramework.start();
		try {
			if (!curatorFramework.blockUntilConnected(glueZookeeperConfig.getMaxSleepTimeMilliseconds() * glueZookeeperConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
				curatorFramework.close();
				throw new KeeperException.OperationTimeoutException();
			}
		} catch (Exception e) {
			log.error("zk exception", e);
		}
		return curatorFramework;
	}

	private Executor asyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(5);
		executor.setMaxPoolSize(10);
		executor.setQueueCapacity(200);
		ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
		executor.setRejectedExecutionHandler(callerRunsPolicy);
		executor.setThreadNamePrefix("zkAsyncExecutor-");
		executor.initialize();
		return executor;
	}

}
